from copy import deepcopy
from threading import Thread
import os
import os.path
import time
import pickle

from ma.commons.core.taskinprogress import *
from ma.commons.core.reducetipinfo import ReduceTIPInfo
from ma.commons.core.reducetaskrunner import ReduceTaskRunner
from ma.commons.core.processrunner import ProcessRunner
from .partitioner import Partitioner
from ma.utils import location
import ma.commons.core.inputfetchernoquery as inputfetchernoquery
import ma.commons.core.constants as constants
import ma.log
import ma.const


class ReduceTIP(TaskInProgress,Thread):
    """This class derives from TaskInProgress and keeps some additional information
    and functionality that is relevant to a reduce task only
    """
    
    def __init__(self, nodetracker, reducetip_info = None, reduce_level=0, struct_id=constants.STRUCT_ID_DEFAULT):
        """arguments description:
        nodetracker_id is the tracker id this task will run on
        reducetip_info is a ReduceTIPInfo kind of pre fabricated object sent to init
        """
        
        self.__log = ma.log.get_logger('ma.mapred')
        
        TaskInProgress.__init__(self, nodetracker)
        Thread.__init__(self)
        
        #this is the dict to which the user function will emit keys and 
        #reduced values tuples
        #the struct is key -> list_of_values
        #this is the key value dict that will persist for the life of this 
        # reduceTIP as files are read one by one by the map and the key value 
        # pairs emitted will be appended to it and then finally written to a 
        # file
        self.key_value_dict = {}
        
        #to overshadow inherited TaskInProgressInfo object with a ReduceTIPInfo object
        if reducetip_info is None:
            # a reduceTIPinfo with default setting with no affiliation to a job or task
            self.tip_info = ReduceTIPInfo(-1,-1,None)

        else:
            self.tip_info = deepcopy(reducetip_info)
        
        #by default the reduce level of any reduce task is 0 or the first level
        self.reduce_level = reduce_level
        self.struct_id = struct_id
        
        user_process_id = self.tip_info.returnTaskID()
        
        #the task runner object that will launch the user provided by TaskRunner
        user_reduce_module = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_module_name, self.tip_info.job_id)
        user_reduce_class = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_class_name, self.tip_info.job_id)
        user_runner_program = location.get_src_ma_path() + ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_mr_redtask_runner, self.tip_info.job_id)
        
        # get the filename which holds the reduce compute misc. output
        proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, self.tip_info.job_id, self.tip_info.task_id)
        ProcessRunner.store_pickled_data([self.tip_info], proc_temp_filename, self.tip_info.job_id)
        self.process_runner = ProcessRunner(user_runner_program, [user_reduce_module, user_reduce_class, self.tip_info.job_id, self.tip_info.task_id], user_process_id)

        #this list holds a place value for every map for a job; index denoting map task_id
        #every value can be 0 (not-done) 1 (done not shuffled) 2 (done and shuffled)
        self.completed_shuffled_maps_list = []
        #self.no_of_maps = ma.const.JobsXmlData.get_int_data(ma.const.xml_no_maps, self.tip_info.job_id)
        
        for i in range(0,len(self.tip_info.input_data)):
            self.completed_shuffled_maps_list.append(0)
                        
        #temp list housing maps completed for the job
        self.temp_completed_maps_list = []               
        
        self.launched = False
    
    
    def localizeTask(self, maps_complete):
        """Considering that the reduce intermediate output is local to NTs
        this class will pick up map output; if it is a reduce level 0 task;
        or a reduce x-1 level output if it is a reduce level x task.
        Function is called when this task receives a broadcast is received 
        from another nodetracker saying that the input for this task has been
        successfully stored locally for a  map task or a reduce.    
        
        1. talk to the master or the hdfs and ask which tasks you must pick your input from
        2.a. (master) pick up a map output file from every NT giving the reduce tasks hash value 
        2.b. (hdfs) pick up the map and reduce tasks ids from which you must pick your input   
          i. you broadcast that you want output of these tasks and who so ever has them responds
          ii. start a TCP client, you tcp the files from these nodes.
          
          maps_complete is a subset of input_data
        """
        
        # create the input fetcher for shuffling data
        self.input_fetcher = inputfetchernoquery.InputFetcherNoQuery(self.nodetracker.output_server, self.nodetracker, inputfetchernoquery.InputFetcherNoQuery.MAP_REDUCE_TYPE)
        
        remaining_inputs = self.input_fetcher.fetch_input(maps_complete, self.tip_info.task_id, self.tip_info.job_id)
        
        if remaining_inputs is not None and len(remaining_inputs) > 0:
            self.__log.error('Was not able to retrieve all reduce inputs: remaining %s', str(remaining_inputs))
            return False
                        
        self.__log.info('Copied all map outputs for reduce-id %d, job-id %d: %s', self.tip_info.task_id, self.tip_info.job_id, str(remaining_inputs))
        return True
    
    
    def launchTask(self):
        """this function will launch the TaskRunner
        """
        
        self.launched = True
        
        arglist = [self.tip_info.job_id, self.tip_info.task_id, constants.REDUCE, self]
        self.pingtimer_id = self.nodetracker.timer.addTimer(self.task_ping_interval, self.nodetracker.healthPingFromTask,arglist)
        
        # note the starting time
        self.tip_info.start_time = time.time()
        
        # start the process ... and wait till it ends
        ret = self.process_runner.start_and_end_process()
        
        # note the end time
        self.tip_info.end_time = time.time()
        
        return ret
    
    
    def taskComplete(self):
        """When the reduce task is complete this function will inform the node
        tracker so that it can mark on the HDFS that reduce ouput is ready for 
        input to subsequent reduce tasks
        """
        
        # do the cleanup, delete the input files
        partitioner = Partitioner(self.tip_info.job_id)
        hash_value = partitioner.taskIdToHash(self.tip_info.task_id)
        input_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, self.tip_info.job_id) + os.sep
        
        for input_data in self.tip_info.input_data:
            input_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename_with_key, input_data[0], input_data[2], hash_value)
            input_filepath = os.path.join(input_dest_dir, input_filename)
            os.remove(input_filepath)
        
        self.__log.info('Reduce task %d of job %d completed processing', self.tip_info.task_id, self.tip_info.job_id)
        
        self.nodetracker.taskComplete(self.tip_info)
        
    
    def newlyCompletedMaps(self):
        """Find the new maps from the nodetracker whose data can now be
        shuffled for input
        """
        
        new_maps_list = []
        
        # get completed maps of a job
        completed_maps_list = self.nodetracker.completed_maps[self.tip_info.job_id]
        # iterate through the whole maps to be shuffled list
        for map in self.maps_to_be_shfld:
            found_cmpltd_map = [ x for x in completed_maps_list if x[0] == map[2] ]
            if found_cmpltd_map:
                new_maps_list.append((self.tip_info.job_id, constants.MAP, map[2], found_cmpltd_map[0][1]))
        
        return new_maps_list
    
        
    def log_thresholded_keys(self):
        """This function logs the thresholded keys to the ma.thresholder
        logger
        """
        
        thresholder_log = ma.log.get_logger('ma.thresholder')
        
        # write to the log all keys which crossed the threshold with their
        #    times of thresholding
        for thresholded_key in self.list_of_thresholded_keys:
            thresholder_log.info(str(thresholded_key[1]) + ',' + str(thresholded_key[0]) + ',' + str(thresholded_key[2]))
            
    
    def run(self):
        """This function will run as this thread is started and marks the flow of 
        how this ReduceTIP will function
        """
        
        self.__log.info('Started ReduceTIP for %s', self.tip_info.returnTaskID())
        
        # make a copy of the input data
        self.maps_to_be_shfld = self.tip_info.input_data + []
        
        #the internal between heartbeats; sent to the JobTracker
        self.sleep_interval = ma.const.XmlData.get_float_data(ma.const.xml_nt_master_heartbeat_interval) * 2
        
        while len(self.maps_to_be_shfld) > 0:
            # find done maps which are in this RTip's map to be shuffled list
            newly_completed_maps = self.newlyCompletedMaps()
            
            # localize these new maps
            if not self.localizeTask(newly_completed_maps):
                self.nodetracker.killTask(self.tip_info.job_id, constants.REDUCE, self.tip_info.task_id)
                self.__log.error('Couldn\'t shuffle all maps for Reduce %d', self.tip_info.task_id)
                raise IOError('The ReduceTIP couldn\'t shuffle data for Reduce %d' % self.tip_info.task_id)
            
            # take out the newly completed maps from the maps to be shfld list
            for map in newly_completed_maps:
                self.maps_to_be_shfld.remove(map)
            
            time.sleep(self.sleep_interval) 
        
        # all maps shuffled, continue to starting task
        #launch and finish task
        if self.launchTask():
            # get the filename which holds the reduce compute misc. output
            proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, self.tip_info.job_id, self.tip_info.task_id)
            
            # get the pickled list data
            list = ProcessRunner.pull_pickled_data(proc_temp_filename, self.tip_info.job_id)
            
            # list of keys crossing threshold during computation
            self.list_of_thresholded_keys = list.__getitem__(0)
            
            # log the values which were thresholded after this computation
            # one doesn't need to worry if the Reduce function was
            #    thresholdable since if it wasn't, an empty list is returned
            self.log_thresholded_keys()
               
            #marks task as complete
            self.taskComplete()
        else:
            # TODO: Called failed task
            self.__log.error('Reduce task %d of job %d failed while processing', self.tip_info.task_id, self.tip_info.job_id)
            
        # write the file to the DFS
        self.writeOutputToDfs()
    
        self.nodetracker.timer.removeTimer(self.pingtimer_id)        
        
        
        #running in a separate process and if the process dies before completion 
        #it shud just remove the ping timer so that the health pings stop and 
        #the node tracker registers that the task is dead!
        #subprocess Popen object has a poll method!
                
              
    def writeOutputToDfs(self):
        """writes the output to the DFS
        """
        
        self.__log.info("--> Attempting to copy Reduce %d output to DFS", self.tip_info.task_id)
        self.output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, 
                                                                    self.tip_info.job_id)
        
        filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, self.tip_info.job_id, self.tip_info.task_id)
        output_filepath = self.output_dest_dir + os.sep + filename
        
        dfs_output_dir = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_output, self.tip_info.job_id)
        dfs_output_filepath = dfs_output_dir + ma.const.dfs_dir_sep + filename
        
        try:
            # copy file to hdfs if present
            if os.path.isfile(output_filepath):
                self.nodetracker.hdfs.delete_file(dfs_output_filepath, True)
                self.nodetracker.hdfs.copy_file_from_local(output_filepath, dfs_output_dir, self.tip_info.job_id)
            else:
                self.__log.error('File computed by Reduce %d is not present: %s', self.tip_info.task_id, output_filepath)    
        except Exception as err_msg:
            self.__log.error('Reduce output file could not be written to the HDFS: %s', err_msg)
        
        